 1.Kafka源码剖析之Topic创建流程
   
   1).Topic创建
   有两种创建方式：自动创建、手动创建。在server.properties中配置auto.create.topics.enable=true 时，
kafka在发现该topic不存在的时候会按照默认配置手动创建topic,触发⾃动创建topic有以下两种情况：
   (1).Producer向某个不存在的Topic写入消息
   (2).Consumer从某个不存在的Topic读取消息
   2).手动创建
   当auto.create.topics.enable=false 时，需要手动创建topic，否则消息会发送失败。手动创建topic的方式
如下：
   bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic kafka_test
--partitions 10  --replication-factor 1
   --replication-factor: 副本数目
   --partitions: 分区数据
   --topic: topic名字
   3).查看Topic入口
   查看脚本文件kafka-topics.sh
   exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
   最终还是调用的TopicCommand 类：首先判断参数是否为空，并且create、list、alter、descibe、delete只允许
存在一个，进行参数验证，创建zookeeper 链接，如果参数中包含create 则开始创建topic，其他情况类似

def main(args: Array[String]): Unit = {
    val opts = new TopicCommandOptions(args)
// 判断参数长度
    if(args.length == 0)
      CommandLineUtils.printUsageAndDie(opts.parser, "Create, delete, describe, or
change a topic.")
// create、list、alter、descibe、delete只允许存在一个
// should have exactly one action
    val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt,
opts.deleteOpt).count(opts.options.has _)
    if(actions != 1)
       CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly
one action: --list, --describe, --create, --alter or --delete")
// 参数验证
    opts.checkArgs()
// 初始化zookeeper链接
    val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt),
                          30000,
                          30000,
                          JaasUtils.isZkSecurityEnabled())
    var exitCode = 0
    try {
      if(opts.options.has(opts.createOpt))
	// 创建topic
       createTopic(zkUtils, opts)
     else if(opts.options.has(opts.alterOpt))
// 修改topic
       alterTopic(zkUtils, opts)
     else if(opts.options.has(opts.listOpt))
// 列出所有的topic，bin/kafka-topics.sh --list --zookeeper localhost:2181
       listTopics(zkUtils, opts)
     else if(opts.options.has(opts.describeOpt))
// 查看topic描述，bin/kafka-topics.sh --describe --zookeeper localhost:2181
       describeTopic(zkUtils, opts)
     else if(opts.options.has(opts.deleteOpt))
// 删除topic
       deleteTopic(zkUtils, opts)
    } catch {
      case e: Throwable =>
        println("Error while executing topic command : " + e.getMessage)
        error(Utils.stackTrace(e))
        exitCode = 1
    } finally {
      zkUtils.close()
      Exit.exit(exitCode)
    }
  }
   4).创建Topic
   下面我们主要来看一下createTopic 的执行过程：
def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
// 获取topic名称
    val topic = opts.options.valueOf(opts.topicOpt)
    val configs = parseTopicConfigsToBeAdded(opts)
    val ifNotExists = opts.options.has(opts.ifNotExistsOpt)
    if (Topic.hasCollisionChars(topic))
      println("WARNING: Due to limitations in metric names, topics with a period
('.') or underscore ('_') could collide. To avoid issues it is best to use either,
but not both.")
    try {
//如果客户端指定了topic的partition的replicas分配情况，则直接把所有topic的元数据信息持久化
//写入到zk，
// topic的properties写入到/config/topics/{topic}目录，
// topic的PartitionAssignment写⼊到/brokers/topics/{topic}目录
      if (opts.options.has(opts.replicaAssignmentOpt)) {
         val assignment =
parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
         AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic,
assignment, configs, update = false)
      } else {
		  // 否则需要自动生成topic的PartitionAssignment
          CommandLineUtils.checkRequiredArgs(opts.parser, opts.options,
opts.partitionsOpt, opts.replicationFactorOpt)
// 分区
          val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
// 副本集
          val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
// 从0.10.x版本开始，kafka可以支持指定broker的机架信息，如果指定了机架信息则在副本分配时
// 会尽可能地让分区的副本分不到不同的机架上。
// 指定机架信息是通过kafka的配置文件config/server.properties中的broker.rack参数来配置的
          val rackAwareMode = if (opts.options.has(opts.disableRackAware))
RackAwareMode.Disabled
          else RackAwareMode.Enforced
          AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs,
rackAwareMode)
      }
      println("Created topic \"%s\".".format(topic))
    } catch {
      case e: TopicExistsException => if (!ifNotExists) throw e
    }
  }
    (1).如果客户端指定了topic的partition的replicas分配情况，则直接把所有topic的元数据信息持久化写入到zk，
topic的properties写入到/config/topics/{topic}目录， topic的PartitionAssignment写入到/brokers/topics/{topic}
目录 
    (2).根据分区数量、副本集、是否指定机架来自动生成topic的分区数据
	(3).下面继续来看AdminUtils.createTopic 方法
def createTopic(zkUtils: ZkUtils,
                  topic: String,
                  partitions: Int,
                  replicationFactor: Int,
                  topicConfig: Properties = new Properties,
                  rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
// 获取集群中每个broker的brokerId和机架信息信息的列表，为下面的
    val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
// 根据是否禁用指定机架策略来生成分配策略
    val replicaAssignment =
AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions,
replicationFactor)
// 在zookeeper中创建或更新主题分区分配路径
    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic,
replicaAssignment, topicConfig)
  }
    (4).下面继续来看AdminUtils.assignReplicasToBrokers 方法
def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],
                              nPartitions: Int,
                              replicationFactor: Int,
                              fixedStartIndex: Int = -1,
                              startPartitionId: Int = -1): Map[Int, Seq[Int]]
= {
    if (nPartitions <= 0)
// 分区个数partitions不能小于等于0
        throw new InvalidPartitionsException("Number of partitions must be
larger than 0.")
    if (replicationFactor <= 0)
// 副本个数replicationFactor不能小于等于0
        throw new InvalidReplicationFactorException("Replication factor must be
larger than 0.")
    if (replicationFactor > brokerMetadatas.size)
// 副本个数replicationFactor不能大于broker的节点个数
        throw new InvalidReplicationFactorException(s"Replication factor:
$replicationFactor larger than available brokers: ${brokerMetadatas.size}.")
    if (brokerMetadatas.forall(_.rack.isEmpty))
// 没有指定机架信息的情况
        assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor,
brokerMetadatas.map(_.id), fixedStartIndex,startPartitionId)
    else {
// 针对指定机架信息的情况，更加复杂一点
      if (brokerMetadatas.exists(_.rack.isEmpty))
         throw new AdminOperationException("Not all brokers have rack
information for replica rack aware assignment.")
      assignReplicasToBrokersRackAware(nPartitions, replicationFactor,
brokerMetadatas, fixedStartIndex,
         startPartitionId)
     }
  }
    1).未指定机架策略
private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
                                                replicationFactor:
Int,
                                                brokerList: Seq[Int],
                                                fixedStartIndex: Int,
                                                startPartitionId:
Int): Map[Int, Seq[Int]] = {
     val ret = mutable.Map[Int, Seq[Int]]()
     val brokerArray = brokerList.toArray
     val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else
rand.nextInt(brokerArray.length)
     var currentPartitionId = math.max(0, startPartitionId)
     var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex
else rand.nextInt(brokerArray.length)
     for (_ <- 0 until nPartitions) {
	   if (currentPartitionId > 0 && (currentPartitionId %
brokerArray.length == 0))
         nextReplicaShift += 1
       val firstReplicaIndex = (currentPartitionId + startIndex) %
brokerArray.length
       val replicaBuffer =
mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
       for (j <- 0 until replicationFactor - 1)
         replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex,
nextReplicaShift, j, brokerArray.length))
       ret.put(currentPartitionId, replicaBuffer)
       currentPartitionId += 1
    }
    ret
   }
	遍历每个分区partition然后从brokerArray（brokerId的列表）中选取replicationFactor个
brokerId分配给这个partition.
    创建一个可变的Map用来存放本⽅法将要返回的结果，即分区partition和分配副本的映射关系。
由于fixedStartIndex为-1,所以startIndex是一个随机数，用来计算一个起始分配的brokerId，同
时由于startPartitionId为-1,所以currentPartitionId的值为0，可见默认创建topic时总是从编号为
0 的分区依次轮询进行分配.nextReplicaShift表示下一次副本分配相对于前一次分配的位移量，这
个字面上理解有点绕，不如举个例子：假设集群中有3个broker节点，即代码中的brokerArray，
创建某topic有3个副本和6个分区，那么⾸先从partitionId（partition的编号）为0的分区开始进行
分配，假设第一次计算（由rand.nextInt(brokerArray.length)随机）到nextReplicaShift为1，第
一次随机到的startIndex为2，那么partitionId为0的第一个副本的位置（这⾥指的是brokerArray
的数组下标）firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length =
（0+2）%3 = 2,第二个副本的位置为replicaIndex(firstReplicaIndex, nextReplicaShift, j,
brokerArray.length) = replicaIndex(2, nextReplicaShift+1,0, 3)=?。
    继续计算 replicaIndex(2, nextReplicaShift+1,0, 3) = replicaIndex(2, 2,0, 3)= (2+(1+(2+0)%(3-
1)))%3=0. 继续计算下一个副本的位置replicaIndex(2, 2,1, 3) = (2+(1+(2+1)%(3-1)))%3=1。所以
partitionId为0的副本分配位置列表为[2,0,1]，如果brokerArray正好是从0开始编号，也正好是顺
序不间断的，即brokerArray为[0,1,2]的话，那么当前partitionId为0的副本分配策略为[2,0,1]。如
果brokerId不是从零开始，也不是顺序的（有可能之前集群的其中broker几个下线了），最终的
brokerArray为[2,5,8]，那么partitionId为0的分区的副本分配策略为[8,2,5]。为了便于说明问题，
可以简单的假设brokerArray就是[0,1,2]。
    同样计算下一个分区，即partitionId为1的副本分配策略。此时nextReplicaShift还是为2，没有
满足自增的条件。这个分区的firstReplicaIndex = (1+2)%3=0. 第二个副本的位置
replicaIndex(0,2,0,3) = (0+(1+(2+0)%(3-1)))%3 = 1,第三个副本的位置replicaIndex(0,2,1,3) = 2，
最终partitionId为2的分区分配策略为[0,1,2]
	2).指定机架策略
private def assignReplicasToBrokersRackAware(nPartitions: Int,
                                              replicationFactor: Int,
                                              brokerMetadatas:Seq[BrokerMetadata],
                                              fixedStartIndex: Int,
                                            startPartitionId: Int):
Map[Int, Seq[Int]] = {
    val brokerRackMap = brokerMetadatas.collect { case
BrokerMetadata(id, Some(rack)) =>
      id -> rack
}.toMap
    val numRacks = brokerRackMap.values.toSet.size
    val arrangedBrokerList =
getRackAlternatedBrokerList(brokerRackMap)
    val numBrokers = arrangedBrokerList.size
    val ret = mutable.Map[Int, Seq[Int]]()
    val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else
rand.nextInt(arrangedBrokerList.size)
    var currentPartitionId = math.max(0, startPartitionId)
    var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex
else rand.nextInt(arrangedBrokerList.size)
    for (_ <- 0 until nPartitions) {
       if (currentPartitionId > 0 && (currentPartitionId %
arrangedBrokerList.size == 0))
          nextReplicaShift += 1
       val firstReplicaIndex = (currentPartitionId + startIndex) %
arrangedBrokerList.size
       val leader = arrangedBrokerList(firstReplicaIndex)
       val replicaBuffer = mutable.ArrayBuffer(leader)
       val racksWithReplicas = mutable.Set(brokerRackMap(leader))
       val brokersWithReplicas = mutable.Set(leader)
       var k = 0
       for (_ <- 0 until replicationFactor - 1) {
           var done = false
           while (!done) {
           val broker =
arrangedBrokerList(replicaIndex(firstReplicaIndex, nextReplicaShift *
numRacks, k, arrangedBrokerList.size))
val rack = brokerRackMap(broker)
// Skip this broker if
// 1. there is already a broker in the same rack that has
// assigned a replica AND there is one or more racks
// that do not have any replica, or
// 2. the broker has already assigned a replica AND there is
// one or more brokers that do not have replica assigned
           if ((!racksWithReplicas.contains(rack) ||
racksWithReplicas.size == numRacks)
                && (!brokersWithReplicas.contains(broker) ||
brokersWithReplicas.size == numBrokers)) {
              replicaBuffer += broker
              racksWithReplicas += rack
              brokersWithReplicas += broker
              done = true
           }
            k += 1
         }
       }
        ret.put(currentPartitionId, replicaBuffer)
        currentPartitionId += 1
    }
     ret
  }
	  (1).assignReplicasToBrokersRackUnaware的执⾏前提是所有的broker都没有配置机架信息，而
assignReplicasToBrokersRackAware的执⾏前提是所有的broker都配置了机架信息，如果出现部分broker配置
了机架信息而另一部分没有配置的话，则会抛出AdminOperationException的异常，如果还想要顺利创建topic
的话，此时需加上“--disable-rack-aware”
	  (2).第一步获得brokerId和rack信息的映射关系列表brokerRackMap,之后调用getRackAlternatedBrokerList()
方法对brokerRackMap做进一步的处理生成一个brokerId的列表。举例：假设目前有3个机架rack1、rack2和rack3，
以及9个broker，
分别对应关系如下：
     rack1: 0, 1, 2
     rack2: 3, 4, 5
     rack3: 6, 7, 8
	  那么经过getRackAlternatedBrokerList()方法处理过后就变成了[0, 3, 6, 1, 4, 7, 2, 5, 8]这
样一个列表，显而易见的这是轮询各个机架上的broker而产生的，之后你可以简单的将这个列表看成是
brokerId的列表，对应assignReplicasToBrokersRackUnaware()方法中的brokerArray，但是其中包含了
简单的机架分配信息。之后的步骤也和未指定机架信息的算法类似，同样包含startIndex、currentPartiionId
, nextReplicaShift的概念，循环为每一个分区分配副本。分配副本时处理第一个副本之外，其余的也调
用replicaIndex方法来获得一个broker，但是这里和assignReplicasToBrokersRackUnaware()不同的是，这
里不是简单的将这个broker添加到当前分区的副本列表之中，还要经过一层的筛选，满足以下任意一个条件
的broker不能被添加到当前分区的副本列表之中：
    1. 如果此broker所在的机架中已经存在一个broker拥有该分区的副本，并且还有其他的机架中没有任何
一个broker拥有该分区的副本。对应代码中的(!racksWithReplicas.contains(rack) || racksWithReplicas.size
 == numRacks)
    2. 如果此broker中已经拥有该分区的副本，并且还有其他broker中没有该分区的副本。对应代码中的
(!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers))


    (5).无论是带机架信息的策略还是不带机架信息的策略，上层调用方法AdminUtils.assignReplicasToBrokers()
最后都是获得一个[Int, Seq[Int]]类型的副本分配列表，其最后作为kafka zookeeper节点/brokers/topics/{topic-name}
节点数据。至此kafka的topic创建就讲解完了，有些同学会感到很疑问，全文通篇(包括上一篇)都是在讲述如何分配
副本，最后得到的也不过是个分配的方案，并没有真正创建这些副本的环节，其实这个观点没有任何问题，对于通过
kafka提供的kafka-topics.sh脚本创建topic的方法来说，它只是提供一个副本的分配方案，并在kafka zookeeper中
创建相应的节点而已。kafka broker的服务会注册监听/brokers/topics/目录下是否有节点变化，如果有新节点创建
就会监听到，然后根据其节点中的数据（即topic的分区副本分配方案）来创建对应的副本。
   
   